#!/usr/bin/env python3
# group: rw
#
# Test permissions taken by the mirror-top filter
#
# Copyright (C) 2021 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import os
import iotests
from iotests import qemu_img

# Import qemu after iotests.py has amended sys.path
# pylint: disable=wrong-import-order
import qemu


image_size = 1 * 1024 * 1024
source = os.path.join(iotests.test_dir, 'source.img')


class TestMirrorTopPerms(iotests.QMPTestCase):
    def setUp(self):
        assert qemu_img('create', '-f', iotests.imgfmt, source,
                        str(image_size)) == 0
        self.vm = iotests.VM()
        self.vm.add_drive(source)
        self.vm.add_blockdev(f'null-co,node-name=null,size={image_size}')
        self.vm.launch()

        # Will be created by the test function itself
        self.vm_b = None

    def tearDown(self):
        try:
            self.vm.shutdown()
        except qemu.machine.AbnormalShutdown:
            pass

        if self.vm_b is not None:
            self.vm_b.shutdown()

        os.remove(source)

    def test_cancel(self):
        """
        Before commit 53431b9086b28, mirror-top used to not take any
        permissions but WRITE and share all permissions.  Because it
        is inserted between the source's original parents and the
        source, there generally was no parent that would have taken or
        unshared any permissions on the source, which means that an
        external process could access the image unhindered by locks.
        (Unless there was a parent above the protocol node that would
        take its own locks, e.g. a format driver.)
        This is bad enough, but if the mirror job is then cancelled,
        the mirroring VM tries to take back the image, restores the
        original permissions taken and unshared, and assumes this must
        just work.  But it will not, and so the VM aborts.

        Commit 53431b9086b28 made mirror keep the original permissions
        and so no other process can "steal" the image.

        (Note that you cannot really do the same with the target image
        and then completing the job, because the mirror job always
        took/unshared the correct permissions on the target.  For
        example, it does not share READ_CONSISTENT, which makes it
        difficult to let some other qemu process open the image.)
        """

        result = self.vm.qmp('blockdev-mirror',
                             job_id='mirror',
                             device='drive0',
                             target='null',
                             sync='full')
        self.assert_qmp(result, 'return', {})

        self.vm.event_wait('BLOCK_JOB_READY')

        # We want this to fail because the image cannot be locked.
        # If it does not fail, continue still and see what happens.
        self.vm_b = iotests.VM(path_suffix='b')
        # Must use -blockdev -device so we can use share-rw.
        # (And we need share-rw=on because mirror-top was always
        # forced to take the WRITE permission so it can write to the
        # source image.)
        self.vm_b.add_blockdev(f'file,node-name=drive0,filename={source}')
        self.vm_b.add_device('virtio-blk,drive=drive0,share-rw=on')
        try:
            self.vm_b.launch()
            print('ERROR: VM B launched successfully, this should not have '
                  'happened')
        except qemu.qmp.QMPConnectError:
            assert 'Is another process using the image' in self.vm_b.get_log()

        result = self.vm.qmp('block-job-cancel',
                             device='mirror')
        self.assert_qmp(result, 'return', {})

        self.vm.event_wait('BLOCK_JOB_COMPLETED')


if __name__ == '__main__':
    # No metadata format driver supported, because they would for
    # example always unshare the WRITE permission.  The raw driver
    # just passes through the permissions from the guest device, and
    # those are the permissions that we want to test.
    iotests.main(supported_fmts=['raw'],
                 supported_protocols=['file'])
